Workflowsが多重起動されないようにする方法をCloud Run Functionsを用いる方法で考えてみた

Workflowsが多重起動されないようにする方法をCloud Run Functionsを用いる方法で考えてみた

Clock Icon2025.01.13

はじめに

データパイプライン構築などでWorkflowsを用いている場合は同じワークフローを多重起動することを避けたい場合があると思います。多重起動を防止するためにはFirestoreなど何らかのDBに記録しておく方法がよくあると思いますが、
ワークロードによっては使用するリソースを増やしたくない時もあると思います。
以前のブログではWorkflowsのWorkflowsコネクタを用いてWorkflowsのワークフロー内で多重起動する方法を考えて試してみました。
https://dev.classmethod.jp/articles/20241212-workflows-concurrent-execution/

しかしながら上記の方法ではミリ秒の差で起動された場合は正しく重複起動判定ができませんでした。

そこで今回はミリ秒の差で起動された場合でも正しく重複起動判定ができるような実装を考えてみました。

概要

指定したワークフローの実行結果一覧を取得できるprojects.locations.workflows.executions.listAPIを用います。
https://cloud.google.com/workflows/docs/reference/executions/rest/v1/projects.locations.workflows.executions/list#google.cloud.workflows.executions.v1.Executions.ListExecutions

ListAPIの概要

ListAPIは引数に指定した名前のワークフローの実行履歴を返却します。また返却される値は実行時間の降順(新しい順)となっています。
以下が代表的な引数です。

引数 説明
parent 必須。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名の形式で指定する
pageSize 返却される実行履歴の最大数。pageSizeを設定しなかった場合は100。viewに設定した値で設定できる最大値は異なる
pageToken ページネーションを行う場合はpageTokenを用いる
view 返却されるフィールド値を制御する引数。EXECUTION_VIEW_UNSPECIFIEDBASICFULLから選択。省略した場合はBASICがデフォルト値。BASICの場合はnamestart_timeend_timestateworkflow_revision_idが返却される。

代表的なレスポンスパラメータを以下に示します。

パラメータ 説明
name 実行履歴名。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ワークフロー実行IDの形式。ワークフロー実行IDとほぼ同義
state 実行状態。今回の肝なので返却値のパターンは別表
startTime ワークフロー実行開始時間
workflowRevisionId 実行されたワークフローのリビジョン

上表の実行状態(state)が返却する値は以下です。

state 説明
STATE_UNSPECIFIED 無効な状態
ACTIVE ワークフロー実行中
SUCCEEDED ワークフローが実行に成功して終了
FAILED ワークフローの実行に失敗
CANCELLED ワークフローの実行がキャンセル

どうやって重複起動か判断するか

ListAPIで取得した実行結果一覧のなかで実行中のワークフローの開始時間(startTime)を重複起動判定時の条件に加えます。
判定対象のワークフロー実行が、ACTIVEなワークフロー実行かつ最も古い開始時間の実行ではない場合は重複起動と判断し、ワークフローを実行しないようにすればよいと考えました。
言い換えるとACTIVEなワークフローで最も古い開始時間の場合は実行を継続して問題ない、と判断します。

開始時間をもとに判定をしたいので、pythonで判定処理を実装してCloud Run FunctionsにデプロイしてWorkflowsからはHTTPリクエストで呼び出すようにしました。
Workflows(YAML上)で開始時間の比較を行うことが難しいためです。

判定用スクリプト

何はともあれスクリプト全文です。今回はWorkflowsのpythonクライアントライブラリを使用しました。

import functions_framework
from google.cloud.workflows.executions_v1 import ExecutionsClient
from google.cloud.workflows.executions_v1.types import ListExecutionsRequest, Execution
from flask import jsonify, abort

def can_execute_workflow(project_num: str, location: str, workflow_name: str, current_execution_name: str):
    """
    指定されたワークフローの多重起動判定関数
    現在の実行がACTIVE状態の中で最も開始時間が古いかどうかを判定

    Args:
        project_num (str): プロジェクト番号
        location (str): ワークフローがデプロイされているリージョン
        workflow_name (str): ワークフローの名前
        current_execution_name (str): 現在の実行名

    Returns:
        dict: 実行可否と関連情報を含むレスポンス
            - "can_execute" (bool): 実行可能であればTrue
            - "message" (str): 実行可否の理由
            - "latest_active_execution" (dict or None): 現在のACTIVEな実行の中で最も古い実行の情報
    """
    client = ExecutionsClient()
    parent = f"projects/{project_num}/locations/{location}/workflows/{workflow_name}"

    try:
        # 実行リストを取得
        request = ListExecutionsRequest(parent=parent)
        response = client.list_executions(request=request)

        # ACTIVE状態の実行をフィルタリング
        active_executions = [
            execution for execution in response.executions
            if execution.state == Execution.State.ACTIVE  # Enumの値と比較
        ]

        # ACTIVE状態の実行がない場合実行可能
        if not active_executions:
            return {
                "can_execute": True,
                "message": "No ACTIVE executions found. Safe to execute.",
                "latest_active_execution": None,
            }

        # 最も開始時間が古いACTIVEな実行を取得
        oldest_execution = min(
            active_executions,
            key=lambda execution: execution.start_time
        )

        # 現在の実行が最も古いかどうか判定
        if oldest_execution.name == current_execution_name:
            return {
                "can_execute": True,
                "message": "Current execution is the oldest ACTIVE execution. Safe to execute.",
                "latest_active_execution": {
                    "name": oldest_execution.name,
                    "state": Execution.State(oldest_execution.state).name,
                    "start_time": oldest_execution.start_time,
                },
            }
        else:
            return {
                "can_execute": False,
                "message": "Current execution is not the oldest ACTIVE execution. Do not execute.",
                "latest_active_execution": {
                    "name": oldest_execution.name,
                    "state": Execution.State(oldest_execution.state).name,
                    "start_time": oldest_execution.start_time,
                },
            }

    except Exception as e:
        return {
            "can_execute": False,
            "message": f"Error occurred while checking executions: {e}",
            "latest_active_execution": None,
        }

# Cloud Functions HTTPエントリポイント
@functions_framework.http
def check_workflows_run(request):
    """
    Cloud FunctionsのHTTPエントリポイント
    JSONリクエストを受け取り、ワークフローの実行可否を判定

    Args:
        request (flask.Request): HTTPリクエストオブジェクト

    Returns:
        flask.Response: 実行可否の結果を含むJSONレスポンス
    """
    try:
        # JSONデータを取得
        data = request.get_json()
        if not data:
            abort(400, description="Invalid JSON payload")

        # 必須フィールドのチェック
        required_fields = ["project_num", "location", "workflow_name", "execution_id"]
        for field in required_fields:
            if field not in data:
                abort(400, description=f"Missing required field: {field}")

        # 必要な情報を取得
        project_num = data['project_num']
        location = data['location']
        workflow_name = data['workflow_name']
        execution_id = data['execution_id']
        current_execution_name = f"projects/{project_num}/locations/{location}/workflows/{workflow_name}/executions/{execution_id}"

        # 実行可否を判定
        result = can_execute_workflow(project_num, location, workflow_name, current_execution_name)

        # 実行可能の場合は200を返す
        if result["can_execute"]:
            return jsonify(result), 200

        # 実行不可の場合は503を返す
        return jsonify(result), 503

    except Exception as e:
        return jsonify({"error": f"Unexpected error: {e}"}), 500

処理の概要を以下に解説します。
can_execute_workflow 関数
ワークフローの実行リストを取得し、現在の実行が「最も古いACTIVE状態の実行」であるかどうか判定

引数

  • project_num (str): プロジェクト番号
  • location (str): ワークフローがデプロイされているリージョン
  • workflow_name (str): ワークフローの名前
  • current_execution_name (str): 現在の実行名(APIリクエストに必要な形式で表現)

処理フロー

  1. Workflows API の呼び出し:

    • ExecutionsClient を使用して、指定されたワークフローの実行リストを取得
  2. ACTIVE 状態の実行をフィルタリング:

    • 実行リストの中から、Execution.State.ACTIVEの状態を持つものだけを抽出(=実行状態がACTIVE)
  3. ACTIVE 実行が存在しない場合:

    • 判定対象ワークフロー実行が実行可能であると判断し、can_execute=True を返却
  4. 最も古い ACTIVE 実行の判定:

    • start_time を基準に、最も古い ACTIVE 実行を取得
  5. 現在の実行が最も古いか判定:

    • 現在の実行が最も古い場合は can_execute=Trueを返却(実行OK)、それ以外の場合は can_execute=False(実行NG)を返却

戻り値

  • can_execute (bool): 実行可能であれば True、それ以外は False
  • message (str): 実行可否の判定結果
  • latest_active_execution (dict or None): 最も古い ACTIVEな実行の情報(名前、状態、開始時刻)

check_workflows_run関数
Cloud Run FunctionsのHTTPエントリポイント。リクエストを受け取り、can_execute_workflow 関数を呼び出して結果を返却する

処理フロー

  1. リクエストの検証:

    • リクエストボディが有効な JSON 形式であるかを確認
    • 必須フィールド(project_numlocationworkflow_nameexecution_id)が存在するかをチェック
  2. 実行可否の判定:

    • can_execute_workflow 関数を呼び出し、実行可否を判定
  3. レスポンスの生成:

    • 実行可能な場合は、200 OKを返却
    • 実行不可の場合は、503 Service Unavailableを返却
    • 処理中にエラーが発生した場合は、500 Internal Server Errorを返却

requirements.txtは以下です。

requirements.txt
functions-framework==3.*
google-cloud-workflows

テスト用のWorkflows

- init:
    assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - project_num: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
        - workflow_exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
        - workflow_name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
        - workflow_location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
        - functions_url: "作成したCloud Run FunctionsのURL"
- checkRunsStep:
    call: http.post
    args:
        url: ${functions_url}
        auth:
            type: OIDC
        body:
            project_num: ${project_num}
            location: ${workflow_location}
            workflow_name: ${workflow_name}
            execution_id: ${workflow_exec_id}
        timeout: 500
    result: processResult
- sleepStep:
    call: sys.sleep
    args:
        seconds: 30
- endStep:
    return: ${processResult}

プロジェクト番号、リージョン、ワークフロー名、ワークフローの実行IDを引数にCloud Run Functions関数を呼び出します。
チェック結果でOKとなれば実行が継続しますし、NG(多重起動)と判定された場合は503エラーが返ってくるのでワークフロー実行は例外発生で終了します。
※30秒sleep処理を入れていますが、これは実装時にテストのため使っていました。30秒のsleep処理の間に手で再度ワークフローを再実行して多重起動判定されるかどうかをみたりなど・・・不要なので消しても大丈夫です。

テスト用のシェルスクリプト

ミリ秒レベルでワークフロー実行したいのでシェルで並列起動するようにしてみました。

#!/bin/bash

# ワークフロー名とリージョンを設定
WORKFLOW_NAME="ワークフロー名"
LOCATION="asia-northeast1"

# 5回並列実行
for i in {1..5}; do
  echo "Executing workflow $i..."
  gcloud workflows execute "$WORKFLOW_NAME" --location="$LOCATION" &
done
# 全てのバックグラウンドプロセスが完了するのを待つ
wait
echo "All executions completed."

これを実行すればミリ秒レベルでワークフローが実行できます(端末のスペックにもよります・・・)

いざ、テストしてみる

シェルスクリプトを実行します。

sh xxx.sh

というような感じで。

ワクワクしながら結果を見てみます。

gcloud workflows executions list "ワークフロー名" --location=asia-northeast1

5件の結果が返ってきています。観察します。

Execution ID State Start Time End Time
bdbf9d03-a0a2-4afe-9673-da596e618841 FAILED 2025-01-13T13:56:46.039515471Z 2025-01-13T13:56:48.640074167Z
1a5cd146-2d41-4a6e-9b5c-2a99f392956e FAILED 2025-01-13T13:56:46.039425206Z 2025-01-13T13:56:48.518246471Z
cc3f1cd3-2300-465f-bd4f-dcaf5886fe0c FAILED 2025-01-13T13:56:46.039418308Z 2025-01-13T13:56:48.665984647Z
61035a2a-dbec-49d5-a6fc-63c4fe4c5573 FAILED 2025-01-13T13:56:46.030504920Z 2025-01-13T13:56:48.524949182Z
44812407-c4ac-4a08-a4c0-e79e50c3389a SUCCEEDED 2025-01-13T13:56:46.028397865Z 2025-01-13T13:57:08.549466843Z

1つのワークフロー実行だけSUCCEEDEDとなっており実行に成功しています。また、各ワークフロー実行のStart Time(開始時間)を見るとミリ秒レベルでの起動となっているのが確認できます。
ミリ秒レベルで同時に起動されても、多重起動判定を行い1つの処理だけ実行することができている様子が上記の結果よりわかります。いやーよかった。

注意

この実装では、ミリ秒レベルの多重起動にも対応できると考えています。が、同じワークフローを呼び出し引数を変えて実行する場合は対応できません。
正しい呼び出しも多重起動扱いしてしまいます。
スクリーンショット 2025-01-13 23.13.52.png

このような場合はList APIを用いた判定だけでは難しいと考えます・・・
この場合はFirestoreなど外部の記録媒体に呼び出し引数などをセットで記録してそれぞれのグループごとに多重起動判定をする必要があると考えます。

おわりに

今回実装したような多重起動判定用のCloud Run Functions関数をWorkflowsのワークフローの最初の方のステップに実装し判定することでワークフローの多重起動を防ぐことが可能になると思われます。
他にもいろいろな方法があると思いますし、今回の方法もベストとは言い難い方法とも思えています。そしてもっといろいろな方法を探ってみようと思っています。

それではまた、ナマステー

参考

https://cloud.google.com/python/docs/reference/workflows/latest/google.cloud.workflows.executions_v1.services.executions.ExecutionsClient#google_cloud_workflows_executions_v1_services_executions_ExecutionsClient_list_executions

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.